package kafka

import (
	"github.com/Shopify/sarama"
)

// Client kafka客户端接口
type Client interface {
	SyncProducer() (SyncProducer, error)
	Consumer() (Consumer, error)
	ConsumerGroup(groupID string) (ConsumerGroup, error)
	Close() error
	Closed() bool
}

type client struct {
	client sarama.Client
}

func (d client) Close() error {
	return d.client.Close()
}

func (d client) Closed() bool {
	return d.client.Closed()
}

func (d client) ConsumerGroup(groupID string) (ConsumerGroup, error) {
	sGroup, err := sarama.NewConsumerGroupFromClient(groupID, d.client)
	if err != nil {
		return nil, err
	}
	return &consumerGroup{
		consumerGroup: sGroup,
	}, err
}

func (d client) Consumer() (Consumer, error) {
	sConsumer, err := sarama.NewConsumerFromClient(d.client)
	if err != nil {
		return nil, err
	}
	return &consumer{
		consumer: sConsumer,
	}, nil
}

// syncProducer 生成同步生产者对象
func (d client) SyncProducer() (SyncProducer, error) {
	sProducer, err := sarama.NewSyncProducerFromClient(d.client)
	if err != nil {
		return nil, err
	}
	return &syncProducer{
		producer: sProducer,
	}, nil
}

// NewClient 初始化kafka客户端
/*func NewClient(addrs []string, config *Config) (Client, error) {
	sConfig := saramaConfig(config)

	sClient, err := sarama.NewClient(addrs, sConfig)
	if err != nil {
		return nil, err
	}
	return client{
		client: sClient,
	}, nil
}*/

// NewClient 初始化kafka客户端 带密码
func NewClientWithPass(addrs []string, config *Config, SaslConfig *KaConfig) (Client, error) {
	sConfig := saramaConfig(config, SaslConfig)
	sConfig.Net.SASL.Enable = SaslConfig.SaslEnable
	sConfig.Net.SASL.User = SaslConfig.User
	sConfig.Net.SASL.Password = SaslConfig.Password
	sClient, err := sarama.NewClient(addrs, sConfig)
	if err != nil {
		return nil, err
	}
	return client{
		client: sClient,
	}, nil
}
